/*
 * Copyright (c) 2020 - present, Inspur Genersoft Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.iec.edp.caf.caching.manager;

import io.iec.edp.caf.caching.properties.CacheSetting;
import io.iec.edp.caf.caching.properties.JedisPoolConfigProperties;
import io.iec.edp.caf.caching.properties.RedisCacheProperties;
import io.iec.edp.caf.caching.serializer.KeyStringSerializer;
import io.iec.edp.caf.caching.enums.RedisServerMode;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.JedisPoolConfig;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * RedisClient管理器
 *
 * @author guowenchang
 * @date 2020-05-07
 */
public class RedisClientManager {

    private Map<String, RedisClient> pools;
    private Lock lock;
    private JedisClientConfiguration clientConfig;

    private static RedisSerializer defaultValueSerializer = new StringRedisSerializer();
    private static KeyStringSerializer keyStringSerializer = new KeyStringSerializer();
    private static GenericJackson2JsonRedisSerializer hashValueSerializer = new GenericJackson2JsonRedisSerializer();

    /**
     * 传入JedisPool相关配置 并构建clientConfig
     *
     * @param jedisPoolConfigProperties
     */
    public RedisClientManager(JedisPoolConfigProperties jedisPoolConfigProperties, RedisCacheProperties settings) {
        this.pools = new HashMap<>();
        this.lock = new ReentrantLock();
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(jedisPoolConfigProperties.getMaxTotal());
        poolConfig.setMaxIdle(jedisPoolConfigProperties.getMaxIdle());
        poolConfig.setMaxWaitMillis(jedisPoolConfigProperties.getMaxWaitMillis());
        poolConfig.setMinIdle(jedisPoolConfigProperties.getMinIdle());
        poolConfig.setTestOnBorrow(jedisPoolConfigProperties.isTestOnBorrow());
        poolConfig.setTestOnReturn(jedisPoolConfigProperties.isTestOnReturn());
        poolConfig.setTestWhileIdle(jedisPoolConfigProperties.isTestWhileIdle());
        this.clientConfig = JedisClientConfiguration.builder()
                .usePooling().poolConfig(poolConfig).and().readTimeout(Duration.ofMillis(jedisPoolConfigProperties.getMaxWaitMillis())).build();

        initRedisClients(settings, this.clientConfig);
    }

    private void initRedisClients(RedisCacheProperties settings, JedisClientConfiguration clientConfig) {
        if (settings != null && settings.getCacheSettings() != null) {
            for (CacheSetting cacheSetting : settings.getCacheSettings()) {
                String key = cacheSetting.getMode() == RedisServerMode.STANDALONE ?
                        String.format("%s-%s-%s", cacheSetting.getHost(), cacheSetting.getPort(), cacheSetting.getDatabase()) :
                        String.format("%s-%s", cacheSetting.getMode(), cacheSetting.getClusterNodes());

                if (!this.pools.containsKey(key)) {
                    try {
                        lock.lock();

                        //读取配置并新建redisTemplate实例
                        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
                        JedisConnectionFactory jedisConnectionFactory = null;

                        if (!this.pools.containsKey(key)) {
                            switch (cacheSetting.getMode()) {
                                case SENTINEL:
                                    RedisSentinelConfiguration redisSentinelConfiguration = new RedisSentinelConfiguration();
                                    redisSentinelConfiguration.setSentinels(cacheSetting.getClusterNodes());
                                    redisSentinelConfiguration.setMaster(cacheSetting.getMaster());
                                    redisSentinelConfiguration.setPassword(cacheSetting.getPassword());
                                    jedisConnectionFactory = new JedisConnectionFactory(redisSentinelConfiguration, clientConfig);
                                    jedisConnectionFactory.afterPropertiesSet();
                                    break;
                                case CLUSTER:
                                    RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
                                    redisClusterConfiguration.setClusterNodes(cacheSetting.getClusterNodes());
                                    redisClusterConfiguration.setMaxRedirects(cacheSetting.getMaxRedirects());
                                    redisClusterConfiguration.setPassword(cacheSetting.getPassword());
                                    jedisConnectionFactory = new JedisConnectionFactory(redisClusterConfiguration, clientConfig);
                                    jedisConnectionFactory.afterPropertiesSet();
                                    break;
                                case STANDALONE:
                                    RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
                                    redisStandaloneConfiguration.setHostName(cacheSetting.getHost());
                                    redisStandaloneConfiguration.setPort(cacheSetting.getPort());
                                    redisStandaloneConfiguration.setPassword(cacheSetting.getPassword());
                                    redisStandaloneConfiguration.setDatabase(cacheSetting.getDatabase());
                                    jedisConnectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration, clientConfig);
                                    break;
                            }

                            jedisConnectionFactory.afterPropertiesSet();
                            redisTemplate.setConnectionFactory(jedisConnectionFactory);

                            redisTemplate.setEnableDefaultSerializer(false);
                            // 设置值（value）的序列化采用StringRedisSerializer。
                            redisTemplate.setValueSerializer(defaultValueSerializer);
                            redisTemplate.setHashValueSerializer(hashValueSerializer);
                            // 设置键（key）的序列化采用KeyStringSerializer。
                            redisTemplate.setKeySerializer(keyStringSerializer);
                            redisTemplate.setHashKeySerializer(keyStringSerializer);
                            redisTemplate.afterPropertiesSet();

                            RedisClient redisClient = new RedisClient(redisTemplate);
                            redisClient.addMessageListener(new MessageListenerAdapter(), "Default");
                            this.pools.put(key, redisClient);
                        }
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
    }

    /**
     * 获取RedisTemplate
     */
    public RedisClient getRedisClient(CacheSetting cacheSetting) {
        String key = cacheSetting.getMode() == RedisServerMode.STANDALONE ?
                String.format("%s-%s-%s", cacheSetting.getHost(), cacheSetting.getPort(), cacheSetting.getDatabase()) :
                String.format("%s-%s", cacheSetting.getMode(), cacheSetting.getClusterNodes());

        return this.pools.get(key);
    }
}
